Snowflake×dbtを試してみた~Part4:パイプライン構築編その2~ #SnowflakeDB #dbt
※本エントリは、Snowflakeをより使いこなそう! Advent Calendar 2021の22日目の記事となります。
さがらです。
Snowflake公式のdbtと連携した時の機能を一通り試すことが出来るQUICKSTARTSに関して試してみた内容をまとめていきます。※この記事は「Part4:パイプライン構築編その2」となります。
この記事の内容について
Snowflake公式のQUICKSTARTSに、Accelerating Data Teams with dbt & SnowflakeというSnowflakeとdbtを組み合わせたときの利点を一通り試すことが出来るクイックスタートがあります。
こちらの内容について、以下の合計5本の構成で試してみた内容を書いていきます。
- Part1:基本設定編
- 「1.Overview」から「5. dbt Configuration」
- Part2:Project設定編
- 「6. dbt Project Configuration」
- Part3:パイプライン構築編その1
- 「7. Building dbt Data Pipelines」から「9. dbt pipelines - Intermediate」
- Part4:パイプライン構築編その2 ※この記事です。
- 「10. dbt pipelines - Seeds」から「12. dbt pipelines - Facts」
- Part5:テスト&Doc&デプロイ編 ※12/23に公開予定、公開後にリンクも正常に動作します。
- 「13. dbt pipelines - Tests & Docs」から「16. Appendix」
この記事では、「パイプライン構築編その2」ということで「10. dbt pipelines - Seeds」から「12. dbt pipelines - Facts」の内容についてまとめていきます。
10. dbt pipelines - Seeds
ここでは、トレーディングブックに関するcsv形式でデータを2つ用意し、dbt seed
機能を使ってアップロードしていきます。ちなみにトレーディングブックの内容としては、AAPL株を売買し、支払/受取の現金を異なる通貨(USDとGBP)で記録しています。
csvデータの準備
まず、csvのデータを2つ作成していきます。
New File
から、名前をseeds/manual_book1.csv
に書き換え、新しく作成してください。※公式のページでは「data」という新しいフォルダを定義していましたが、この手順通りだと後のdbt seed
コマンドが正しく動かないため注意してください。
作成したmanual_book1.csv
に対して、以下の内容をコピーして貼り付け、右上のsave
を押してください。
Book,Date,Trader,Instrument,Action,Cost,Currency,Volume,Cost_Per_Share,Stock_exchange_name B2020SW1,2021-03-03,Jeff A.,AAPL,BUY,-17420,GBP,200,87.1,NASDAQ B2020SW1,2021-03-03,Jeff A.,AAPL,BUY,-320050,GBP,3700,86.5,NASDAQ B2020SW1,2021-01-26,Jeff A.,AAPL,SELL,52500,GBP,-500,105,NASDAQ B2020SW1,2021-01-22,Jeff A.,AAPL,BUY,-100940,GBP,980,103,NASDAQ B2020SW1,2021-01-22,Nick Z.,AAPL,SELL,5150,GBP,-50,103,NASDAQ B2020SW1,2019-08-31,Nick Z.,AAPL,BUY,-9800,GBP,100,98,NASDAQ B2020SW1,2019-08-31,Nick Z.,AAPL,BUY,-1000,GBP,50,103,NASDAQ
続いて、もう1つのcsvデータを定義していきます。
New File
から、名前をseeds/manual_book2.csv
に書き換え、新しく作成してください。
作成したmanual_book2.csv
に対して、以下の内容をコピーして貼り付け、右上のsave
を押してください。
Book,Date,Trader,Instrument,Action,Cost,Currency,Volume,Cost_Per_Share,Stock_exchange_name B-EM1,2021-03-03,Tina M.,AAPL,BUY,-17420,EUR,200,87.1,NASDAQ B-EM1,2021-03-03,Tina M.,AAPL,BUY,-320050,EUR,3700,86.5,NASDAQ B-EM1,2021-01-22,Tina M.,AAPL,BUY,-100940,EUR,980,103,NASDAQ B-EM1,2021-01-22,Tina M.,AAPL,BUY,-100940,EUR,980,103,NASDAQ B-EM1,2019-08-31,Tina M.,AAPL,BUY,-9800,EUR,100,98,NASDAQ
dbt seedによるデータの取り込み
続いて、先程定義した2つのcsvのデータをSnowflakeに対してロードしていきます。
コマンドラインにdbt seed
と入力し、実行してください。
※補足として、このdbt seed
で数十万行のデータを読み込むことは可能ですが、大規模なデータセット用に作成されたものではないので、そのような場合はCOPY/SnowpipeなどSnowflakeで推奨されている方法でロードを行ってください。
下図のように表示されれば、実行完了です。
実際のSnowflakeでテーブルの有無を確認してみると、ユーザーのデフォルトのスキーマに対してロードされていることがわかります。
無事にロードが出来たら、今の状態をCommitしておきましょう
ロードしたデータをUNIONするmodelの定義・実行
続いて、dbt seed
によりロードした2つのデータをUNIONするmodelを定義していきます。
New File
から、名前をmodels/marts/core/intermediate/int_unioned_book.sql
に書き換えて、新しくファイルを作成します。
作成したint_unioned_book.sql
ファイルに対して、以下のクエリをコピーして貼り付け、右上のsave
を押します。
with unioned as ( {{ dbt_utils.union_relations( relations=[ref('manual_book1'), ref('manual_book2')] ) }} ), renamed as ( select Book, Date as book_date, Trader, Instrument, Action as book_action, Cost, Currency, Volume, Cost_Per_Share, Stock_exchange_name from unioned ) select * from renamed
ここで、一度Compile
を押してみてください。下図のようにUNIONが行われているクエリになっていることがわかると思います。
このUNION処理は、貼り付けたクエリのdbt_utils.union_relations()
マクロにより作られています。このマクロに対して、UNIONしたいテーブルを指定するだけで、カラム名や型を考慮し、自動でUNIONを行うクエリに変換してくれます。
最後に、このmodelを実行しておきましょう。
コマンドラインでdbt run -m int_unioned_book
と入力し、実行してください。
実行後にSnowflakeでも確認してみると、適切にテーブルが作られていることがわかると思います。
11. dbt pipelines - Intermediate Part 2
ここでは、また別の中間modelを2つ定義していきます。
内容として、株式の購入・売却の記録に関する取引活動のログ情報はすでにあるのですが、株式が保有されている間の情報を日次で保持し、よりパフォーマンスを計測・確認しやすくするためのデータ生成を行うmodelを定義していきます。
1つ目のmodel定義
まず1つ目のmodelを定義していきます。
New File
から、名前をmodels/marts/core/intermediate/int_daily_position.sql
に書き換えて、新しいファイルを作成します。
作成したら、下記のクエリをコピーして貼り付けて、右上のsave
を押します。
with stock_history as ( select * from {{ ref('int_stock_history_major_currency') }} ), unioned_book as ( select * from {{ ref('int_unioned_book') }} ), cst_market_days as ( select distinct stock_date from stock_history where stock_history.stock_date >= (select min(book_date) as min_dt from unioned_book) ), joined as ( select cst_market_days.stock_date, unioned_book.trader, unioned_book.stock_exchange_name, unioned_book.instrument, unioned_book.book, unioned_book.currency, sum(unioned_book.volume) as total_shares from cst_market_days inner join unioned_book on unioned_book.book_date = cst_market_days.stock_date where unioned_book.book_date <= cst_market_days.stock_date {{ dbt_utils.group_by(6) }} ) select * from joined
このクエリの途中で、dbt_utils.group_by
というマクロを使用しています。通常GROUP BY句を書くと、対象のカラム名を全て指定しないといけないのですが、このマクロを使用することでSELECT句内の上から順番にマクロの引数で指定した数だけカラム名を取得するので、クエリのコード量を削減することが出来ます。
2つ目のmodel定義
New File
から、名前をmodels/marts/core/intermediate/int_daily_position_with_trades.sql
に書き換えて、新しいファイルを作成します。
このint_daily_position_with_trades.sql
ファイルに対して下記のクエリをコピーして貼り付けて、右上のsave
を押してください。
with unioned_book as ( select * from {{ ref('int_unioned_book') }} ), daily_position as ( select * from {{ ref('int_daily_position') }} ), unioned as ( select book, book_date, trader, instrument, book_action, cost, currency, volume, cost_per_share, stock_exchange_name, sum(unioned_book.volume) over( partition by instrument, stock_exchange_name, trader order by unioned_book.book_date rows unbounded preceding) as total_shares from unioned_book union all select book, stock_date as book_date, trader, instrument, 'HOLD' as book_action, 0 as cost, currency, 0 as volume, 0 as cost_per_share, stock_exchange_name, total_shares from daily_position where (book_date,trader,instrument,book,stock_exchange_name) not in (select book_date,trader,instrument,book,stock_exchange_name from unioned_book ) ) select * from unioned
定義したmodelの実行
コマンドラインでdbt run --models int_unioned_book+
と入力して、これまでに作成したmodelを実行します。+
をmodel名の後ろにつけることで、対象のmodelの子に位置するmodelが順番にまとめて実行されます。
実行後、下図のように3つのmodelが実行されていればOKのです!
Commitと作成したデータへのクエリ
まずここまでの内容を一度Commitしておきましょう。私はset up intermediate model for daily performance analysis
と入れておきました。
この上で、新しいStatementを追加し、下記SQLを貼り付けてスキーマ名を自身のものに変更し、Previewを押してみてください。
select * from pc_dbt_db.<dev_schema>_marts.int_daily_position_with_trades where trader = 'Tina M.' order by book_date
下図のように、日々の売買実績の合計がTOTAL_SHARE
として確認できていればOKです!dbtでのデータ変換を通して、より実用性の高いデータを整えることが出来ました。
12. dbt pipelines - Facts
ここでは、最終的にBIツールなどで使用されるmodelを定義していきます。
11章までの作業で、取引履歴と株価の履歴に関するmodelを定義してきました。これらのmodelを活用して、Market ValueとPnL(損益のこと)が時間とともにどのように変化したかを示すモデルを作っていきましょう。
modelの定義
New File
を押し、名前をmodels/marts/core/fct_trading_pnl.sql
に書き換えて新しくファイルを作成します。
これまで中間modelの定義はmodels/marts/core/intermediate/
で行ってきましたが、このmodelはmodels/marts/core/
の直下に定義していきます。core
の名の通り、これはビジネス上で使用するコアとなるデータを出すためのロジックを定義しているmodelであるためです。
fct_trading_pnl.sql
ファイルが出来ましたら、下記のクエリをコピーして貼り付けて、右上のsave
を押します。
{{ config( tags = 'core' ) }} with daily_positions as ( select * from {{ ref('int_daily_position_with_trades' )}} ), stock_history as ( select * from {{ ref('int_stock_history_major_currency') }} ), joined as ( select daily_positions.instrument, daily_positions.stock_exchange_name, daily_positions.book_date, daily_positions.trader, daily_positions.volume, daily_positions.cost, daily_positions.cost_per_share, daily_positions.currency, sum(cost) over( partition by daily_positions.instrument, daily_positions.stock_exchange_name, trader order by daily_positions.book_date rows unbounded preceding ) as cash_cumulative, case when daily_positions.currency = 'GBP' then gbp_close when daily_positions.currency = 'EUR' then eur_close else 'Close' end AS close_price_matching_ccy, daily_positions.total_shares * close_price_matching_ccy as market_value, daily_positions.total_shares * close_price_matching_ccy + cash_cumulative as PnL from daily_positions inner join stock_history on daily_positions.instrument = stock_history.company_symbol and stock_history.stock_date = daily_positions.book_date and daily_positions.stock_exchange_name = stock_history.stock_exchange_name ) select * from joined
ファイルの保存ができたら、コマンドラインで実行してみましょう!
コマンドラインでdbt run --m fct_trading_pnl
と入力し、実行してみます。(公式ページ上のコマンドdbt run -m fct_trading_pnl.sql
ではファイルの拡張子まで含んでおり、正しく実行できません。)
増分更新を適用したmodelの定義
dbtでは中間modelを定義すること、テーブル作成時のクエリ時間を削減することが出来ます。
しかし、dbtでは増分更新を適用することが出来ます。これにより、modelが対象とするテーブルに対し追加されたレコードだけを追記していく処理になるため、都度テーブルの作り直しが発生しません。
dbtでの増分更新の説明と使い所については、下記の記事にまとまっていますので併せてご覧ください。
では、増分更新を適用させたmodelを作っていきましょう。
New File
を押し、名前をmodels/marts/core/fct_trading_pnl_incremental.sql
に書き換えて新しくファイルを作成します。
fct_trading_pnl_incremental.sql
ファイルが出来ましたら、下記のクエリをコピーして貼り付けて、右上のsave
を押します。
{{ config( materialized='incremental', unique_key= 'pk_key', tags = 'core' ) }} with daily_positions as ( select * from {{ ref('int_daily_position_with_trades' )}} ), stock_history as ( select * from {{ ref('int_stock_history_major_currency') }} ), joined as ( select daily_positions.instrument, daily_positions.stock_exchange_name, daily_positions.book_date, daily_positions.trader, daily_positions.volume, daily_positions.cost, daily_positions.cost_per_share, daily_positions.currency, sum(cost) over( partition by daily_positions.instrument, daily_positions.stock_exchange_name, trader order by daily_positions.book_date rows unbounded preceding ) as cash_cumulative, case when daily_positions.currency = 'GBP' then gbp_close when daily_positions.currency = 'EUR' then eur_close else 'Close' end as close_price_matching_ccy, daily_positions.total_shares * close_price_matching_ccy as market_value, daily_positions.total_shares * close_price_matching_ccy + cash_cumulative as PnL from daily_positions inner join stock_history on daily_positions.instrument = stock_history.company_symbol and stock_history.stock_date = daily_positions.book_date and daily_positions.stock_exchange_name = stock_history.stock_exchange_name ), primary_key as ( select {{ dbt_utils.surrogate_key([ 'trader', 'instrument', 'book_date', 'stock_exchange_name', 'PnL', ]) }} as pk_key, * from joined ) select * from primary_key {% if is_incremental() %} -- this filter will only be applied on an incremental run where book_date > (select max(book_date) from {{ this }}) {% endif %}
増分更新対応でどういった内容を変更したか少し触れておきます。
config
にunique_key
を追加しています。このキーの指定により、ターゲットテーブルの既存の行のユニークキーが、増分対象で変換されたレコードのいずれかと一致する場合、ターゲットテーブルの既存のレコードは更新されます。これにより、ソースデータ内の 1 つの行に対してターゲットテーブル内に複数の行が存在することがなくなります。
また、末尾のif is_incremental()
で囲まれているクエリについては、増分更新として実行されているときのみ実行されるようになります。増分更新の時に限り、book_date
がより新しい日付であるレコードだけに絞り込む、ということを行っています。
では、定義した増分更新のmodelを実行してみます!
コマンドラインでdbt run -m fct_trading_pnl_incremental
と入力し、実行してください。
この実行されたmodelをSnowflakeから確認すると、create or replace transient table
コマンドが実行されていることがわかると思います。初回だからテーブルを作っているんですね。
続いて、もう一度コマンドラインでdbt run -m fct_trading_pnl_incremental
と入力し、実行してください。
この上で、実行されたmodelをSnowflakeから確認すると、一度TEMPORARYテーブルを作成後、merge into
コマンドが実行されているのがわかります。2回目以降だから増分更新が適用されているということですね!
一般的に増分更新と聞くと単純にレコードが増加していくパターンでしか使えない場合も多いのですが、このようにdbtの増分更新はユニークキーを指定するだけで、新規レコードのINSERTだけでなく既存レコードのUPDATEも行ってくれます。個人的に、増分更新のクエリを書いたり処理を作るのは大変な印象が強かったのですが、dbtの増分更新はとても簡単で便利な機能だと感じています!!
次回
Snowflakeをより使いこなそう! Advent Calendar 2021、次回の23日目では、「Snowflake×dbtを試してみた~Part5:テスト&Doc&デプロイ編~」というタイトルで執筆します。お楽しみに!